package cn.jdemo.core.flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.Arrays;

/**
 * Flink程序的执行过程：
 *
 * 1. 获取flink的执行环境(execution environment)
 * 2. 加载数据-- soure
 * 3. 对加载的数据进行转换 -- transformation
 * 4. 对结果进行保存或者打印 --sink
 * 5. 触发flink程序的执行(execute(),count(),collect(),print())，
 *  例如：调用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。
 *
 * ### 全局处理环境
 * @see ExecutionEnvironment
 * @see ExecutionEnvironment#execute()
 * @see StreamExecutionEnvironment
 * @see StreamExecutionEnvironment#execute()
 *
 * 数据源
 * @see DataStreamSource#addSink(SinkFunction) 
 * @see DataSource#count()
 * @see DataSource#collect()
 * @see DataSource#join(DataSet)
 * @see DataSource#groupBy(int...)
 *
 * @see DataSource#print()
 * @see DataSource#writeAsCsv(String)
 * @see DataSource#writeAsText(String)
 * @see DataSource#
 */
public class Flink {
    public static void main(String[] args) throws Exception {
        /* 批处理 */
        ExecutionEnvironment exe = ExecutionEnvironment.getExecutionEnvironment();
        /* source */
        DataSource<Integer> dataSource = exe.fromCollection(Arrays.asList(1, 2, 3));
        /* 执行 */
        dataSource.print();
        exe.execute();
        System.out.println("--------------------------------------");

        /* 流式处理 */
        StreamExecutionEnvironment streamExe = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> dataStreamSource = streamExe.fromCollection(Arrays.asList(1, 2, 3));
        dataStreamSource.print();
        streamExe.execute();
    }


}
